package com.example.networkflow;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.net.URL;
import java.util.Random;

public class PageView {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        URL res = PageView.class.getResource("/UserBehavior.csv");
        System.out.println(">>" + res.getPath());
        DataStream<String> inputStream = env.readTextFile(res.getPath());
        DataStream<UserBehavior> behaviorDataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            UserBehavior userBehavior = new UserBehavior();
            userBehavior.setUserId(new Long(fields[0]));
            userBehavior.setItemId(new Long(fields[1]));
            userBehavior.setCategoryId(new Integer(fields[2]));
            userBehavior.setBehavior(fields[3]);
            userBehavior.setTimestamp(new Long(fields[4]));
            return userBehavior;
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
            @Override
            public long extractAscendingTimestamp(UserBehavior userBehavior) {
                return userBehavior.getTimestamp() * 1000L;
            }
        });

        //分组开窗聚合，得到每个窗口内各个商品的count值
        SingleOutputStreamOperator<Tuple2<String, Long>> pvSingleOutputStreamOperator = behaviorDataStream
                .filter(userBehavior -> "pv".equalsIgnoreCase(userBehavior.getBehavior()))
                .map(new MapFunction<UserBehavior, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(UserBehavior userBehavior) throws Exception {
                        //这种相当于将所有的商品pv信息都分到一个组内了。
                        //return new Tuple2<>("pv", 1L);
                        //return new Tuple2<>(userBehavior.getItemId()+"", 1L);

                        Random random = new Random();
                        //分散，避免数据倾斜。
                        return new Tuple2<>("pv_"+random.nextInt(10), 1L);
                    }
                })
                .keyBy(0) //商品ID分组, 元祖内只能用数字
                .timeWindow(Time.hours(1)) //开1小时滚动窗口
                .sum(1);

        pvSingleOutputStreamOperator.print();

        env.execute("job pageview");
    }

    public static class ItemCountAgg {

    }

    public static class WindowItemCountResult {

    }
}
